package cn.concurrent.notify;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @Author: lz
 * @Date: 2018/9/3 17:26
 * @Version 1.0
 */
public class ProductorConsumerByBlockingQuene {

	private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();

	/**
	 * 由于BlockingQueue内部实现就附加了两个阻塞操作。即当队列已满时，阻塞向队列中插入数据的线程，
	 * 直至队列中未满；当队列为空时，阻塞从队列中获取数据的线程，直至队列非空时为止
	 * @param args
	 */
	public static void main(String[] args) {
		ExecutorService service = Executors.newFixedThreadPool(15);
		for (int i = 0; i < 5; i++) {
			service.submit(new Productor(queue));
		}
		for (int i = 0; i < 10; i++) {
			service.submit(new Consumer(queue));
		}
	}


	static class Productor implements Runnable {

		private BlockingQueue queue;

		public Productor(BlockingQueue queue) {
			this.queue = queue;
		}

		@Override
		public void run() {
			try {
				while (true) {
					Random random = new Random();
					int i = random.nextInt();
					System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);
					queue.put(i);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	static class Consumer implements Runnable {
		private BlockingQueue queue;

		public Consumer(BlockingQueue queue) {
			this.queue = queue;
		}

		@Override
		public void run() {
			try {
				while (true) {
					Integer element = (Integer) queue.take();
					System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

